-
Notifications
You must be signed in to change notification settings - Fork 411
[CELEBORN-1577][Phase1] Storage quota should support interrupt shuffle. #2801
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
@AngersZhuuuu @RexXiong Can you help review? |
| if (response.statusCode == StatusCode.SUCCESS) { | ||
| logDebug("Successfully send app heartbeat.") | ||
| workerStatusTracker.handleHeartbeatResponse(response) | ||
| checkQuotaExceeds(response.checkQuotaResponse) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe add a flag to tell spark whether cancel the stage when quota exceeds?
926bfe8 to
f07108e
Compare
| new util.ArrayList[WorkerInfo]( | ||
| (statusSystem.shutdownWorkers.asScala ++ statusSystem.decommissionWorkers.asScala).asJava))) | ||
| (statusSystem.shutdownWorkers.asScala ++ statusSystem.decommissionWorkers.asScala).asJava), | ||
| CheckQuotaResponse(isAvailable = true, ""))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should actually check quota for the application.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently QuotaManager only support check quota for user, at next PR, we will support check quota for application, so we don’t need to modify rpc proto at this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok
f07108e to
852b6db
Compare
|
cc @RexXiong PTAL. |
RexXiong
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, Only minor comments
| def clientShuffleMapPartitionSplitEnabled = get(CLIENT_SHUFFLE_MAPPARTITION_SPLIT_ENABLED) | ||
| def clientChunkPrefetchEnabled = get(CLIENT_CHUNK_PREFETCH_ENABLED) | ||
| def clientInputStreamCreationWindow = get(CLIENT_INPUTSTREAM_CREATION_WINDOW) | ||
| def clientQuotaInterruptShuffleEnable = get(CLIENT_QUOTA_INTERRUPT_SHUFFLE_ENABLED) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
clientQuotaInterruptShuffleEnable -》 clientQuotaInterruptShuffleEnabled
| import org.apache.celeborn.common.rpc.{ClientSaslContextBuilder, RpcSecurityContext, RpcSecurityContextBuilder} | ||
| import org.apache.celeborn.common.rpc.netty.{LocalNettyRpcCallContext, RemoteNettyRpcCallContext} | ||
| import org.apache.celeborn.common.util.{JavaUtils, PbSerDeUtils, ThreadUtils, Utils} | ||
| // Can Remove this if celeborn don't support scala211 in future |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't remove this
| .createWithDefault(1) | ||
|
|
||
| val CLIENT_QUOTA_INTERRUPT_SHUFFLE_ENABLED: ConfigEntry[Boolean] = | ||
| buildConf("celeborn.client.quota.interruptShuffle.enabled") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps we should rename the configuration to celeborn.quota.interruptShuffle.enabled, as 'quota' represents an independent configuration namespace. It would also be appropriate to move this setting to the quota section.
852b6db to
92237ed
Compare
|
cc @RexXiong Done, thanks for your review. |
|
|
||
| val QUOTA_INTERRUPT_SHUFFLE_ENABLED: ConfigEntry[Boolean] = | ||
| buildConf("celeborn.quota.interruptShuffle.enabled") | ||
| .categories("quota", "master", "client") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems this configuration never used at master
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #2801 +/- ##
==========================================
- Coverage 32.37% 32.27% -0.09%
==========================================
Files 328 329 +1
Lines 19387 19483 +96
Branches 1747 1749 +2
==========================================
+ Hits 6275 6287 +12
- Misses 12769 12853 +84
Partials 343 343 ☔ View full report in Codecov by Sentry. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks! merge to main(v0.6.0)
What changes were proposed in this pull request?
Support interrupt shuffle on client side.
I will develop the following functions in order
Why are the changes needed?
The current storage quota logic can only limit new shuffles, and cannot limit the writing of existing shuffles. In our production environment, there is such an scenario: the cluster is small, but the user's app single shuffle is large which occupied disk resources, we want to interrupt those shuffle.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Unable to test this part independently, Additional tests will be added after completing the second part.